/**
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * <p>http://www.apache.org/licenses/LICENSE-2.0
 *
 * <p>Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing permissions and
 * limitations under the License.
 */
package pl.allegro.tech.hermes.consumers.consumer.batch;

import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/*
 *  CAUTION!
 *  This is repackaged version of org.apache.kafka.clients.producer.internals.BufferPool with few changes.
 *
 *  Most notable changes:
 *  - direct memory allocation
 *  - lack of kafka specific monitoring dependencies
 *
 *  Implementation was deliberately not refactored for easier comparison with the original.
 *
 * */

/**
 * A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to the needs
 * of the producer. In particular it has the following properties:
 *
 * <ol>
 *   <li>There is a special "poolable size" and buffers of this size are kept in a free list and
 *       recycled
 *   <li>It is fair. That is all memory is given to the longest waiting thread until it has
 *       sufficient memory. This prevents starvation or deadlock when a thread asks for a large
 *       chunk of memory and needs to block until multiple buffers are deallocated.
 * </ol>
 */
public final class DirectBufferPool {

  private final long totalMemory;
  private final int poolableSize;
  private final boolean blockOnExhaustion;
  private final ReentrantLock lock;
  private final Deque<ByteBuffer> free;
  private final Deque<Condition> waiters;
  private long availableMemory;

  /**
   * Create a new buffer pool
   *
   * @param memory The maximum amount of memory that this buffer pool can allocate
   * @param poolableSize The buffer size to cache in the free list rather than deallocating
   * @param blockOnExhaustion This controls the behavior when the buffer pool is out of memory. If
   *     true the {@link #allocate(int)} call will block and wait for memory to be returned to the
   *     pool. If false {@link #allocate(int)} will throw an exception if the buffer is out of
   *     memory.
   */
  public DirectBufferPool(long memory, int poolableSize, boolean blockOnExhaustion) {
    this.poolableSize = poolableSize;
    this.blockOnExhaustion = blockOnExhaustion;
    this.lock = new ReentrantLock();
    this.free = new ArrayDeque<>();
    this.waiters = new ArrayDeque<>();
    this.totalMemory = memory;
    this.availableMemory = memory;
  }

  /**
   * Allocate a buffer of the given size. This method blocks if there is not enough memory and the
   * buffer pool is configured with blocking mode.
   *
   * @param size The buffer size to allocate in bytes
   * @return The buffer
   * @throws InterruptedException If the thread is interrupted while blocked
   * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool
   *     (and hence we would block forever)
   * @throws BufferOverflowException if the pool is in non-blocking mode and size exceeds the free
   *     memory in the pool
   */
  public ByteBuffer allocate(int size) throws InterruptedException {
    if (size > this.totalMemory)
      throw new IllegalArgumentException(
          "Attempt to allocate "
              + size
              + " bytes, but there is a hard limit of "
              + this.totalMemory
              + " on memory allocations.");

    this.lock.lock();
    try {
      // check if we have a free buffer of the right size pooled
      if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst();

      // now check if the request is immediately satisfiable with the
      // memory on hand or if we need to block
      int freeListSize = this.free.size() * this.poolableSize;
      if (this.availableMemory + freeListSize >= size) {
        // we have enough unallocated or pooled memory to immediately
        // satisfy the request
        freeUp(size);
        this.availableMemory -= size;
        lock.unlock();
        return ByteBuffer.allocateDirect(size);
      } else if (!blockOnExhaustion) {
        throw new BufferOverflowException();
      } else {
        // we are out of memory and will have to block
        int accumulated = 0;
        ByteBuffer buffer = null;
        Condition moreMemory = this.lock.newCondition();
        this.waiters.addLast(moreMemory);
        // loop over and over until we have a buffer or have reserved
        // enough memory to allocate one
        while (accumulated < size) {
          moreMemory.await();

          // check if we can satisfy this request from the free list,
          // otherwise allocate memory
          if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
            // just grab a buffer from the free list
            buffer = this.free.pollFirst();
            accumulated = size;
          } else {
            // we'll need to allocate memory, but we may only get
            // part of what we need on this iteration
            freeUp(size - accumulated);
            int got = (int) Math.min(size - accumulated, this.availableMemory);
            this.availableMemory -= got;
            accumulated += got;
          }
        }

        // remove the condition for this thread to let the next thread
        // in line start getting memory
        Condition removed = this.waiters.removeFirst();
        if (removed != moreMemory)
          throw new IllegalStateException("Wrong condition: this shouldn't happen.");

        // signal any additional waiters if there is more memory left
        // over for them
        if (this.availableMemory > 0 || !this.free.isEmpty()) {
          if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal();
        }

        // unlock and return the buffer
        lock.unlock();
        if (buffer == null) return ByteBuffer.allocateDirect(size);
        else return buffer;
      }
    } finally {
      if (lock.isHeldByCurrentThread()) lock.unlock();
    }
  }

  /**
   * Attempt to ensure we have at least the requested number of bytes of memory for allocation by
   * deallocating pooled buffers (if needed)
   */
  private void freeUp(int size) {
    while (!this.free.isEmpty() && this.availableMemory < size)
      this.availableMemory += this.free.pollLast().capacity();
  }

  /**
   * Return buffers to the pool. If they are of the poolable size add them to the free list,
   * otherwise just mark the memory as free.
   *
   * @param buffer The buffer to return
   * @param size The size of the buffer to mark as deallocated, note that this maybe smaller than
   *     buffer.capacity since the buffer may re-allocate itself during in-place compression
   */
  public void deallocate(ByteBuffer buffer, int size) {
    lock.lock();
    try {
      if (size == this.poolableSize && size == buffer.capacity()) {
        buffer.clear();
        this.free.add(buffer);
      } else {
        DirectBufferUtils.release(buffer);
        this.availableMemory += size;
      }
      Condition moreMem = this.waiters.peekFirst();
      if (moreMem != null) moreMem.signal();
    } finally {
      lock.unlock();
    }
  }

  public void deallocate(ByteBuffer buffer) {
    deallocate(buffer, buffer.capacity());
  }

  /** the total free memory both unallocated and in the free list */
  public long availableMemory() {
    lock.lock();
    try {
      return this.availableMemory + this.free.size() * this.poolableSize;
    } finally {
      lock.unlock();
    }
  }

  /** Get the unallocated memory (not in the free list or in use) */
  public long unallocatedMemory() {
    lock.lock();
    try {
      return this.availableMemory;
    } finally {
      lock.unlock();
    }
  }

  /** The number of threads blocked waiting on memory */
  public int queued() {
    lock.lock();
    try {
      return this.waiters.size();
    } finally {
      lock.unlock();
    }
  }

  /** The buffer size that will be retained in the free list after use */
  public int poolableSize() {
    return this.poolableSize;
  }

  /** The total memory managed by this pool */
  public long totalMemory() {
    return this.totalMemory;
  }
}
